package io.github.tri5m.tucache.core.pool;

import io.github.tri5m.tucache.core.config.TuCacheProfiles;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @title: default thread pool for all asynchronous tasks generated by tu-cache
 * 构建一个自定义的动态线程池，工作线程数量在 corePoolSize-maxPoolSize根据队列积压长度动态调整。
 * @author: trifolium.wang
 * @date: 2023/12/5
 * @modified:
 */
@Getter
@Slf4j
public class TucacheDefaultThreadPool {

    private static TucacheDefaultThreadPool instance;

    private ThreadPoolExecutor pool;

    private TucacheDefaultThreadPool() {

    }

    public static synchronized TucacheDefaultThreadPool getInstance(TuCacheProfiles tuCacheProfiles) {
        if (instance == null) {
            instance = new TucacheDefaultThreadPool();
            int minThreadNum = tuCacheProfiles.getPool().getCoreThreadNum();
            if (minThreadNum <= 0) {
                minThreadNum = 1;
            }
            int maxThreadNum = tuCacheProfiles.getPool().getMaxThreadNum();
            if (maxThreadNum <= minThreadNum) {
                maxThreadNum = minThreadNum;
            }
            int finalMaxThreadNum = maxThreadNum;
            instance.pool = new ThreadPoolExecutor(minThreadNum, finalMaxThreadNum,
                    tuCacheProfiles.getPool().getKeepAliveTime(), TimeUnit.MILLISECONDS,
                    new LinkedBlockingDeque<Runnable>(Integer.MAX_VALUE) {
                        private final ReentrantLock lock = new ReentrantLock();

                        @Override
                        public boolean offer(Runnable e) {
                            lock.lock();
                            try {
                                // 策略选择为，如果没有达到最大线程数量，且当队列积压任务超过了最大线程数则增加一个新工作线程
                                if (instance.pool.getPoolSize() < finalMaxThreadNum
                                        && this.size() >= finalMaxThreadNum) {
                                    return false;
                                }

                                return offerLast(e);
                            } finally {
                                lock.unlock();
                            }
                        }
                    },
                    new ThreadFactory() {
                        private final AtomicLong threadNumber = new AtomicLong(1);

                        @Override
                        public Thread newThread(Runnable r) {
                            final Thread t = new Thread(null, r, "tu-cache-pool-" + threadNumber.getAndIncrement());
                            t.setDaemon(false);
                            //优先级
                            if (Thread.NORM_PRIORITY != t.getPriority()) {
                                // 标准优先级
                                t.setPriority(Thread.NORM_PRIORITY);
                            }
                            return t;
                        }

                    },
                    (r, executor) -> {
                        log.error("tu-cache thread pool is full.");
                        new ThreadPoolExecutor.AbortPolicy().rejectedExecution(r, executor);
                    });
        }

        return instance;
    }

}
